Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove report and safe from Worker.close #6363

Merged
merged 8 commits into from
May 20, 2022

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented May 18, 2022

  1. This change removes the report flag and the unused safe flag from Worker.close.

The report flag is a recurring problem and from what I understand the reason for #6320 (comment)

Reporting is bad because it opens a dedicated RPC connection which may no longer be possible which then blocks for timeouts.connect seconds.
I am not entirely convinced that reporting itself is even necessary outside of unit tests. Regardless, we do have an implicit stream report built in by now via the worker status update call.

  1. It also removes the close_workers flag from Scheduler.close

Regardless of the close_workers flag, Scheduler.close always terminates all connected workers and their nannies assuming there is a stream_comm still around. Given the assumption that set(stream_comms) == set(self.workers) this flag is redundant and only triggers multiple close requests on the remotes

Closes #6320

Note: The way we're using close is a bit confusing. The base class Server is using close but all subclasses overwrite this and introduce other signatures. The defaults for these signatures is not always the "robust" way. For instance, reporting by default can block unnecessarily which is something we should avoid. I don't think we should have all these toggles in there. We are likely looking for a fast or force flag but not for many individuals like (wait_on_executor, report, close_workers, remove, etc.). Introducing a fast or force flag everywhere would be my preference but I'm trying to limit the scope of this PR.

TODO

@github-actions
Copy link
Contributor

github-actions bot commented May 18, 2022

Unit Test Results

       15 files         15 suites   6h 53m 11s ⏱️
  2 802 tests   2 721 ✔️   79 💤 2
20 774 runs  19 843 ✔️ 928 💤 3

For more details on these failures, see this check.

Results for commit 1e260fa.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member Author

fjetter commented May 19, 2022

@fjetter
Copy link
Member Author

fjetter commented May 19, 2022

Edit: Got it


I am struggling to write a proper reproducer for #6320

A simple way of reproducing this w/out any patching is to provoke some kind of exception during startup, e.g.

# Start a scheduler. This will typically listen on 8786
dask-scheduler
# Now try to start the worker on the same port as the scheduler.
# This should raise and tear down the process which does not properly happen on main
dask-worker 127.0.0.1:8786 --worker-port 8786

@fjetter fjetter force-pushed the server_close_refactor branch from 1259504 to 9e343f0 Compare May 19, 2022 09:21
@fjetter fjetter self-assigned this May 19, 2022
@jakirkham
Copy link
Member

@pentschev would you be able to look at this/test it?

@pentschev
Copy link
Member

Thanks @fjetter for working on this fix. I tested this with the original test I mentioned and that fixed the issue. However, we have a similar one for LocalCUDACluster, which inherits from LocalCluster and that still fails. I was able to write a test for that below, it passes picking a commit before #5910, but hangs on main and raises RuntimeError: Worker failed to start. instead of the plugin error with changes from this PR.

import pytest

from dask.distributed import LocalCluster, Nanny


class MyPlugin:
    def setup(self, worker=None):
        import my_nonexistent_library  # noqa


# Intentionally not using @gen_test to skip cleanup checks
async def test_localcluster_start_exception():
    class MyCluster(LocalCluster):
        def __init__(self):
            super().__init__(
                n_workers=0,
                asynchronous=True,
            )

            self.scale(3)

        def new_worker_spec(self):
            i = len(self.worker_spec)
            return {i: {"cls": Nanny, "options": {"plugins": {MyPlugin()}}}}

    with pytest.raises(ModuleNotFoundError):
        await MyCluster()

@graingert
Copy link
Member

graingert commented May 20, 2022

We are likely looking for a fast or force flag but not for many individuals like (wait_on_executor, report, close_workers, remove, etc.). Introducing a fast or force flag everywhere would be my preference but I'm trying to limit the scope of this PR.

One pattern is following the abort() and close() pattern that ends up looking a bit like:

async def __aexit__(self, *exc_info) -> None:
    if exc_info == (None, None, None):
        await self.close()
    else:
        self.abort()

@fjetter
Copy link
Member Author

fjetter commented May 20, 2022

@pentschev the test you provided does not hang for me, it just fails because it is expecting the wrong exception. There is a change in behavior in that this will no longer raise an ImportError but instead it will always raise a RuntimeError with the cause ImportError. Prior to #5910 there was a lot of ambiguity around why a worker could not startup. Depending on whether you used a nanny, a worker, timeout or no timeouts, the exception type would be different. Now, it is always a RuntimeError with cause. That's an intentional breaking change in #5910

Adjusting your example using the new raises_with_cause util function passes the test. (gen_test fails because some RPCs are not properly cleaned up. That's an unrelated Spec/LocalCluster problem, I believe.)

import pytest

from dask.distributed import LocalCluster, Nanny


class MyPlugin:
    def setup(self, worker=None):
        import my_nonexistent_library  # noqa


# Intentionally not using @gen_test to skip cleanup checks
@pytest.mark.asyncio
async def test_localcluster_start_exception():
    class MyCluster(LocalCluster):
        def __init__(self):
            super().__init__(
                n_workers=0,
                asynchronous=True,
            )

            self.scale(3)

        def new_worker_spec(self):
            i = len(self.worker_spec)
            return {i: {"cls": Nanny, "options": {"plugins": {MyPlugin()}}}}

    with raises_with_cause(RuntimeError, None, ImportError, None):
        async with MyCluster():
            return

@fjetter fjetter force-pushed the server_close_refactor branch from 859e196 to 4e491e3 Compare May 20, 2022 13:06
@fjetter
Copy link
Member Author

fjetter commented May 20, 2022

I'll try to get the test in as well but I don't like it leaking resources. I'll have a quick look if this can be resolved easily

Comment on lines 1171 to 1176
async def test_localcluster_start_exception():
with raises_with_cause(RuntimeError, None, ImportError, None):
async with LocalCluster(
plugins={MyPlugin()},
):
return
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a simplified version of the LocalCluster test reproducer. The subclassing suggested in #6363 (comment) is raising an exception during init which avoids us going through proper cleanup by the contextmanager, i.e. resources are never closed. The interesting bit, however, is the plugin for the workers which can be just passed through as a kwarg

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW failure scenarios for worker startup in a SpecCluster is a bit messy, see #5919


@gen_test(
clean_kwargs={
# FIXME: This doesn't close the LoopRunner properly, leaving a thread around
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graingert this is leaving the thread of a loop runner alive. I didn't want to mess with this but it may interest you

@fjetter
Copy link
Member Author

fjetter commented May 20, 2022

@quasiben @pentschev Can somebody confirm if this is works for you?

@fjetter
Copy link
Member Author

fjetter commented May 20, 2022

Both ubu failures are #6395
OSX tests still pending

@fjetter
Copy link
Member Author

fjetter commented May 20, 2022

FWIW I feel comfortable merging this without successful OSX builds in case this is blocking our release procedure

@jrbourbeau
Copy link
Member

@quasiben @pentschev Can somebody confirm if this is works for you?
FWIW I feel comfortable merging this without successful OSX builds in case this is blocking our release procedure

@quasiben @pentschev @jakirkham could you provide some feedback on if this PR is good to go? My plan is to merge this in a bit and then release

pentschev added a commit to pentschev/dask-cuda that referenced this pull request May 20, 2022
As of dask/distributed#6363, there is a change
in behavior on how plugin errors are raised.
@pentschev
Copy link
Member

Yes, this seems to work. I opened rapidsai/dask-cuda#914 to address this change in Dask-CUDA.

@jrbourbeau
Copy link
Member

Great, thanks for confirming @pentschev. I'll merge this in after CI finishes so this is included in the release today

@jrbourbeau jrbourbeau merged commit 9bb999d into dask:main May 20, 2022
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few concerns about this. I've marked the most urgent ones with ⚠️. Due to the plan to release today, we may want to consider reverting this until we can address them.

comm.send({"op": "close", "report": False})
# This closes the Worker and ensures that if a Nanny is around,
# it is closed as well
comm.send({"op": "terminate"})
comm.send({"op": "close-stream"})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This close-stream is unnecessary. Worker.close will close the stream itself:

if self.batched_stream:
with suppress(TimeoutError):
await self.batched_stream.close(timedelta(seconds=timeout))

Doesn't hurt to leave it though if you want to play it safe.

@@ -3399,15 +3386,16 @@ async def close(self, fast=False, close_workers=False):
logger.info("Scheduler closing all comms")

futures = []
for w, comm in list(self.stream_comms.items()):
for _, comm in list(self.stream_comms.items()):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope, but I'm curious why we don't just use Scheduler.remove_worker here. It's going to run anyway when the stream comms close:

try:
await self.handle_stream(comm=comm, extra={"worker": worker})
finally:
if worker in self.stream_comms:
worker_comm.abort()
await self.remove_worker(address=worker, stimulus_id=stimulus_id)

When we fix up remove_worker to handle closing the comms, and to work properly concurrently #6390, we'll probably want to have exactly one way to close and remove a worker, and reuse it here.

elif ws.status == Status.paused:
self.running.remove(ws)
elif ws.status == Status.closing:
await self.remove_worker(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ This feels like a substantive change that's built around the partially-removed state: #6390. What's the motivation for this? Unless this is strictly necessary, I'd be more comfortable leaving it out, since we know there are weird race conditions around remove_worker(..., close=False) #6354, though they're lessened by having removed reconnect.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW once we're in this state, the worker is already in Worker.close. Worker.close will eventually close the stream once it is done

I am very certain that I added this code path to ensure tests pass. I believe it was connected to a test in deploy but I do not recall what the problem was exactly.
I remember it being connected to removing the worker from self.running w/out properly removing everything else.

@@ -1704,6 +1707,7 @@ def check_thread_leak():

bad_thread = bad_threads[0]
call_stacks = profile.call_stack(sys._current_frames()[bad_thread.ident])
breakpoint()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
breakpoint()

⚠️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except OSError as e:
# Scheduler is gone. Respect distributed.comm.timeouts.connect
if "Timed out trying to connect" in str(e):
logger.info("Timed out while trying to connect during heartbeat")
await self.close(report=False)
await self.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await self.close()
await self.terminate()

⚠️ If the scheduler is gone, but there's a Nanny, I believe this will make the Nanny restart the worker in an infinite loop?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, it's not an infinite restart loop (just because Worker._register_with_scheduler will actually never give up trying to connect to the scheduler), but it does mean the worker will restart if the scheduler crashes and wait forever for it to show up.

In 3 terminals, run:

dask-scheduler & echo $!

then

DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT="1s" dask-worker localhost:8786 --nanny

then

sudo kill -9 <scheduler PID printed in first terminal>

The scheduler will die without sending the terminate signal to workers. The worker will close, the nanny will restart it, and it will then loop forever trying to connect to the nonexistent scheduler.

Is this acceptable? You could argue that if the scheduler crashes, maybe something should bring a new one back up, so it makes sense to restart. But that's assuming a lot about your deployment system, and it should probably be the deployment system's job to do that restarting. To me this is a regression that could lead to wasted resources for anyone counting on the current behavior if the scheduler crashes.

Comment on lines +3422 to +3423
ws = self.workers[worker]
self.worker_send(worker, {"op": "close", "nanny": bool(ws.nanny)})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ws = self.workers[worker]
self.worker_send(worker, {"op": "close", "nanny": bool(ws.nanny)})
self.worker_send(worker, {"op": "terminate"})

The worker can figure out what to do with the nanny on its own.

I'd like to use the new semantics and interfaces properly. terminate and close are similar-sounding words, but they now mean very different things. terminate means actually shut down. close means restart if there's a nanny, otherwise shut down.

I don't think the naming of these is clear enough, but what we want to do here, semantically, is terminate, so we should use that interface directly.

@@ -4183,7 +4171,7 @@ async def remove_worker(self, address, stimulus_id, safe=False, close=True):
logger.info("Remove worker %s", ws)
if close:
with suppress(AttributeError, CommClosedError):
self.stream_comms[address].send({"op": "close", "report": False})
self.stream_comms[address].send({"op": "close"})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ This will now cause workers with nannies to restart, not shut down. Scheduler.remove_worker is called in many places. You could argue that most of them warrant a restart, since it's in response to a comm error or something, but are we sure about that?

Here at least, it doesn't (though this case should be changed anyway #6227):

if remove:
await self.remove_worker(
address=ws.address, safe=True, stimulus_id=stimulus_id
)

@@ -1219,7 +1220,7 @@ async def heartbeat(self):
logger.error(
f"Scheduler was unaware of this worker {self.address!r}. Shutting down."
)
await self.close(report=False)
await self.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the following self.close() and self.close() in handle_scheduler effectively do #6387. I wanted to do that a bit more carefully and add tests.

self,
timeout=30,
executor_wait=True,
nanny=False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ I'm concerned about flipping this default. Most of my comments here are around that. I think there are a number of places close is being used, without passing nanny=True explicitly (both in worker.py and in scheduler.py) and I am not convinced that they're all situations in which the worker process should be restarted if there's a nanny around.

I'm a little worried about this exacerbating problems with Scheduler.remove_worker #6390, but I'm most concerned about verifying that it doesn't create an infinite restart loop.

@jakirkham
Copy link
Member

JFYI it sounds like we are delaying the release until the review comments above are addressed

xref: dask/community#245 (comment)

@fjetter
Copy link
Member Author

fjetter commented May 23, 2022

#6417

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

dask-worker process remains alive after Nanny exception on plugins=
6 participants